package com.lyon.dmeo.storage.client.raft;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.ConcurrentHashSet;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.lyon.demo.storage.client.api.core.command.request.HeartbeatRequest;
import com.lyon.demo.storage.client.api.core.command.request.RequestOrResponse;
import com.lyon.demo.storage.client.api.core.command.request.VoteRequest;
import com.lyon.demo.storage.client.api.core.command.response.HeartBeatResponse;
import com.lyon.demo.storage.client.api.core.command.response.VoteResponse;
import com.lyon.demo.storage.client.api.core.command.response.VoteResponse.VoteNextType;
import com.lyon.demo.storage.client.api.core.command.response.VoteResponse.VoteState;
import com.lyon.demo.storage.client.api.core.config.DLedgerConfig;
import com.lyon.demo.storage.client.api.core.protocol.core.Role;
import com.lyon.demo.storage.client.api.core.protocol.core.RoleChangeHandler;
import com.lyon.demo.storage.client.api.core.protocol.core.RoleChangeSupport;
import com.lyon.demo.storage.common.SystemClock;
import com.lyon.demo.storage.common.thread.StartupShutdownAble;
import com.lyon.demo.storage.common.util.TimeUtil;
import com.lyon.demo.storage.client.api.core.core.MemberState;
import com.lyon.dmeo.storage.client.raft.endpoint.RpcRemoteService;
import com.lyon.dmeo.storage.client.raft.core.ShutdownAbleThread;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static com.lyon.demo.storage.client.api.core.command.response.HeartBeatResponse.ResponseCode;

/**
 * 状态机
 * leader：发送心跳
 * candidate： 开始选举
 * follower： 接收心跳，网络异常时，切换至候选人
 *
 * @author LeeYan9
 * @since 2022-05-19
 */
@Slf4j
@SuppressWarnings({"ALL"})
@Getter
public class DLedgerLeaderElection implements RoleChangeSupport, StartupShutdownAble {

    private final DLedgerServer dLedgerServer;
    private final DLedgerConfig dLedgerConfig;
    private final MemberState memberState;
    private final RpcRemoteService rpcRemoteService;
    private final Set<RoleChangeHandler> roleChangeHandlers;
    private final StateMaintainer stateMaintainer;
    /**
     * 心跳时间间隔
     */
    private long heartBeatTimeInterval;
    private Random random = new Random();

    private long lastHeartbeatSendTime = -1;
    private int maxHeartBeatLeak;
    /**
     * 最大投票时间间隔
     */
    private int maxVoteIntervalMs;
    private int minVoteIntervalMs;

    private long connectTimeout;
    private long readTimeout;
    private long heartbeatResponseTimeout;
    private long voteResponseTimeout;

    private VoteNextType lastVoteNextType = VoteNextType.WAIT_TO_REVOTE;
    private long nextTimeToRequestVote = -1;
    /**
     * 需要立刻增加term期限
     */
    private boolean needIncreaseTermImmediately = false;

    public DLedgerLeaderElection(DLedgerServer dLedgerServer) {
        this.dLedgerServer = dLedgerServer;
        this.rpcRemoteService = dLedgerServer.getRpcRemoteService();
        this.dLedgerConfig = dLedgerServer.getDLedgerConfig();
        this.memberState = dLedgerServer.getMemberState();
        this.roleChangeHandlers = new ConcurrentHashSet<>();
        this.stateMaintainer = new StateMaintainer();
    }

    @Override
    public void startup() {
        stateMaintainer.start();
    }

    @Override
    public void addRoleChangeHandler(RoleChangeHandler handler) {
        roleChangeHandlers.add(handler);
    }

    @Override
    public void shutdown() {
        stateMaintainer.shutdown();
    }

    /**
     * 状态维护者
     */
    @SuppressWarnings("ConvertToBasicLatin")
    class StateMaintainer extends ShutdownAbleThread {


        public StateMaintainer(Logger logger) {
            super(logger);
        }

        public StateMaintainer() {
            super(log);
        }

        @Override
        protected void doWork() {
            // 刷新间隔时间
            refreshIntervals(dLedgerConfig);
            // 状态维护
            maintainState();
        }

        private void maintainState() {
            if (memberState.isLeader()) {
                maintainStateAsLeader();
            } else if (memberState.isCandidate()) {
                maintainStateAsCandidate();
            } else if (memberState.isFollower()) {
                maintainStateAsFollower();
            } else {
                log.error("[BUG] state error");
            }

        }

        private void maintainStateAsLeader() {
            // 第一次 或者 当前时间-最后一次发送心跳时间 >= 心跳间隔
            if (lastHeartbeatSendTime < 0 || TimeUtil.elapsed(lastHeartbeatSendTime) >= heartBeatTimeInterval) {
                long term;
                String leaderId;
                synchronized (memberState) {
                    if (!memberState.isLeader()) {
                        return;
                    }
                    term = memberState.getCurrTerm();
                    leaderId = memberState.getLeaderId();
                    if (!leaderId.equals(memberState.getSelfId())) {
                        throw new RuntimeException("selfId<> LeaderId");
                    }
                }
                try {
                    log.info("send heartbeat.. from leader [{}] to followers {}", memberState.getSelfId(),memberState.getPeerIdsWithoutSelf());
                    sendHeartBeats(term, leaderId);
                } catch (Exception e) {
                    log.error("心跳发送异常[BUG]", e);
                }
            }
        }

        @SneakyThrows
        private void maintainStateAsCandidate() {
            if (SystemClock.now() < nextTimeToRequestVote && !needIncreaseTermImmediately) {
                return;
            }
            long ledgerEndTerm;
            long ledgerEndIndex;
            long currTerm;
            String leaderId;
            synchronized (memberState) {
                if (!memberState.isCandidate()) {
                    return;
                }
                if (needIncreaseTermImmediately || VoteNextType.WAIT_TO_VOTE_NEXT == lastVoteNextType) {
                    long preTerm = memberState.getCurrTerm();
                    currTerm = memberState.nextTerm();
                    lastVoteNextType = VoteNextType.WAIT_TO_REVOTE;
                    log.info("{} 任期编号增加{} to {}", memberState.getSelfId(), preTerm, currTerm);
                } else {
                    currTerm = memberState.getCurrTerm();
                }
                ledgerEndTerm = memberState.getLedgerEndTerm();
                ledgerEndIndex = memberState.getLedgerEndIndex();
                leaderId = memberState.getSelfId();

                if (needIncreaseTermImmediately) {
                    needIncreaseTermImmediately = false;
                    nextTimeToRequestVote = getNextTimeToRequestVote();
                    log.info("[{}] ，下次投票时间戳[{}]", memberState.getSelfId(), nextTimeToRequestVote);
                    return;
                }
            }
            log.info("[{}]准备发起投票..", memberState.getSelfId());
            List<CompletableFuture<VoteResponse>> voteResponses = voteForQuorumResponses(currTerm);
            AtomicInteger allNum = new AtomicInteger(0);
            AtomicInteger acceptedNum = new AtomicInteger(0);
            AtomicInteger notReadyTermNum = new AtomicInteger(0);
            AtomicLong knownMaxTermInGroup = new AtomicLong(currTerm);
            AtomicBoolean alreadyHasLeader = new AtomicBoolean(false);
            AtomicInteger biggerIndexNum = new AtomicInteger(0);
            CountDownLatch voteLatch = new CountDownLatch(1);
            List<VoteResponse> backup = new CopyOnWriteArrayList<>();
            for (CompletableFuture<VoteResponse> future : voteResponses) {
                future.whenComplete((voteResponse, ex) -> {
                    try {
                        if (ex != null) {
                            throw ex;
                        }
                        backup.add(voteResponse);
                        VoteState state = voteResponse.getVoteState();
                        Assert.notNull(state, "投票状态不能为空");
                        switch (state) {
                            // 通过投票
                            case ACCEPT:
                                acceptedNum.incrementAndGet();
                                break;
                            /**
                             * 以下情况直接失败：
                             * 未知的leaderId，请求失败，leader转移，预期外的leader（自身通过rpc给自身发vote），
                             * ledger_term小于目标节点的ledger_term,term小于目标节点的ledger_term
                             * 已经投过票
                             */
                            case REJECT_UNKNOWN_LEADER:
                            case UNKNOWN:
                            case REJECT_UNEXPECTED_LEADER:
                            case REJECT_ALREADY_VOTED:
                            case REJECT_TAKING_LEADERSHIP:
                                break;
                            case REJECT_TERM_SMALL_THAN_LEDGER:
                                // 当前任期编号下，已存在leader节点的情况
                            case REJECT_ALREADY_HAS_LEADER:
                                alreadyHasLeader.set(true);
                                break;
                            // 当前任期编号 小于目标任期编号
                            case REJECT_TERM_EXPIRED:
                                if (voteResponse.getTerm() > currTerm) {
                                    knownMaxTermInGroup.set(voteResponse.getTerm());
                                }
                                break;
                            case REJECT_EXPIRED_LEDGER_TERM:
                                // 已经投票给其他Candidate
                            case REJECT_SMALL_LEDGER_END_INDEX:
                                biggerIndexNum.incrementAndGet();
                                break;
                            // 目标任期编号小于当前任期编号
                            case REJECT_TERM_NOT_READY:
                                notReadyTermNum.incrementAndGet();
                                break;
                            default:
                                log.error("[BUG]投票过程中,发生意料之外的处理-[{}]", JSONUtil.toJsonStr(voteResponse));
                                break;
                        }
                        if (alreadyHasLeader.get()
                                || memberState.isQuorum(acceptedNum.get())
                                || memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
                            voteLatch.countDown();
                        }

                    } catch (Throwable throwable) {
                        log.error("投票时发生异常", throwable);
                    } finally {
                        allNum.incrementAndGet();
                        if (allNum.get() == voteResponses.size()) {
                            voteLatch.countDown();
                        }
                    }
                });
            }
            try {
                voteLatch.await(voteResponseTimeout + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS);
            } catch (Exception ignore) {
                log.error("投票请求等待时，发生异常", ignore);
            }
            log.info("[{}-{}] 投票结果查看 {}", currTerm, memberState.getSelfId(), JSONUtil.toJsonStr(backup));
            VoteNextType voteNextType = lastVoteNextType;
            if (knownMaxTermInGroup.get() > currTerm) {
                // 发现更大的任期编号. 增加期限,重新参与候选投票
                if (log.isDebugEnabled()) {
                    log.info("stateChangeToCandidate1");
                }
                stateChangeToCandidate(knownMaxTermInGroup.get());
                voteNextType = VoteNextType.WAIT_TO_VOTE_NEXT;
                nextTimeToRequestVote = getNextTimeToRequestVote();
            } else if (alreadyHasLeader.get()) {
                // 发现已存在leader, 睡眠一段时间 有机会,再重新发起候选投票
                if (log.isDebugEnabled()) {
                    log.info("stateChangeToCandidate2");
                }
                stateChangeToCandidate(currTerm);
                nextTimeToRequestVote = getNextTimeToRequestVote() + heartBeatTimeInterval + maxHeartBeatLeak;
            } else if (!memberState.isQuorum(allNum.get() - biggerIndexNum.get())) {
                // index 没有大多数节点大 ,等待重新发起投票,增加一定的休眠时间再进行下次投票
                nextTimeToRequestVote = getNextTimeToRequestVote() + maxVoteIntervalMs;
            } else if (memberState.isQuorum(acceptedNum.get())) {
                // 大多数节点同意投票 转换为leader,发送心跳
                voteNextType = VoteNextType.PASSED;
            } else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
                // 大多数节点只是因为term期限不够未能成功投票,立即重新发起投票
                voteNextType = VoteNextType.REVOTE_IMMEDIATELY;
            } else {
                // 大多数节点都因为各种原因失败,增加期限进行新一轮的投票
                voteNextType = VoteNextType.WAIT_TO_VOTE_NEXT;
                nextTimeToRequestVote = getNextTimeToRequestVote();

            }
            if (voteNextType == VoteNextType.PASSED) {
                log.info("选举Leader成功 leaderId:{} term:{} ledgerEndTerm:{} ledgerEndIndex:{}",
                        leaderId, currTerm, ledgerEndTerm, ledgerEndIndex);
                stateChangeToLeader(currTerm);
            }
            lastVoteNextType = voteNextType;
            log.info("[{}] 投票完成后，下次投票类型[{}] 时间戳[{}]", memberState.getSelfId(), voteNextType, nextTimeToRequestVote);
        }

        private void maintainStateAsFollower() {
            if (TimeUtil.elapsed(lastHeartbeatSendTime) > 2 * heartBeatTimeInterval) {
                synchronized (memberState) {
                    if (!memberState.isFollower()) {
                        return;
                    }
                    // 超时最大心跳泄露次数，尝试选举
                    if (TimeUtil.elapsed(lastHeartbeatSendTime) > maxHeartBeatLeak * heartBeatTimeInterval) {
                        if (log.isDebugEnabled()) {
                            log.debug("stateChangeToCandidate3");
                        }
                        stateChangeToCandidate(memberState.getCurrTerm());
                    }
                }
            }
        }

        private void refreshIntervals(DLedgerConfig dLedgerConfig) {
            heartBeatTimeInterval = dLedgerConfig.getHeartBeatTimeInterval();
            connectTimeout = dLedgerConfig.getConnectTimeout();
            readTimeout = dLedgerConfig.getReadTimeout();
            heartbeatResponseTimeout = dLedgerConfig.getHeartBeatTimeout();
            voteResponseTimeout = dLedgerConfig.getVoteResponseTimeout();
            maxHeartBeatLeak = dLedgerConfig.getMaxHeartBeatLeak();
            maxVoteIntervalMs = dLedgerConfig.getMaxVoteIntervalMs();
            minVoteIntervalMs = dLedgerConfig.getMinVoteIntervalMs();
        }

        /**
         * leader 心跳发生
         *
         * @param term
         * @param leaderId
         */
        @SuppressWarnings({"AlibabaMethodTooLong", "StreamToLoop"})
        public void sendHeartBeats(long term, String leaderId) {
            String selfId = memberState.getSelfId();
            //TODO ledgerEndTerm ?
            AtomicLong maxTerm = new AtomicLong(-1);
            AtomicInteger allNum = new AtomicInteger(0);
            AtomicInteger succNum = new AtomicInteger(0);
            AtomicInteger notReadyNum = new AtomicInteger(0);
            AtomicBoolean inconsistentLeader = new AtomicBoolean(false);
            CountDownLatch sendCompletedLatch = new CountDownLatch(1);
            List backup = new CopyOnWriteArrayList();
            memberState.getPeersWithoutSelf().forEach((peerId, address) -> {
                HeartbeatRequest request = new HeartbeatRequest();
                request.setRemoteId(peerId);
                request.setLocalId(selfId);
                request.setTerm(term);
                request.setLeaderId(leaderId);
                CompletableFuture<HeartBeatResponse> future = rpcRemoteService.heartBeat(request);
                future.whenComplete((heartBeatResponse, throwable) -> {
                    allNum.incrementAndGet();
                    if (throwable != null) {
                        log.error("发送心跳失败", throwable);
                    }
                    backup.add(heartBeatResponse);
                    try {
                        switch (heartBeatResponse.getResponseCode()) {
                            case SUCCESS:
                                succNum.incrementAndGet();
                                break;
                            case TERM_NOT_READY:
                                notReadyNum.incrementAndGet();
                                break;
                            case TERM_EXPIRED:
                                maxTerm.set(heartBeatResponse.getTerm());
                                break;
                            case INCONSISTENT_LEADER:
                                inconsistentLeader.compareAndSet(false, true);
                                break;
                            case UNKNOWN_ERROR:
                                log.error("请求id[]遇见未知异常..");
                                break;
                            default:
                                log.error("请求id[]遇见未知错误..");
                                break;
                        }
                        if (memberState.isQuorum(succNum.get()) || memberState.isQuorum(succNum.get() + notReadyNum.get())) {
                            sendCompletedLatch.countDown();
                        }
                    } catch (Exception e) {
                        log.error(StrUtil.format("请求id[] 处理心跳回执失败.."), throwable);
                    } finally {
                        int count = allNum.incrementAndGet();
                        if (count == memberState.getPeerSize()) {
                            sendCompletedLatch.countDown();
                        }
                    }
                });
            });
            try {
                //noinspection ResultOfMethodCallIgnored
                sendCompletedLatch.await(heartbeatResponseTimeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                lastHeartbeatSendTime = -1;
                log.error(CharSequenceUtil.format("请求id[] 处理心跳回执失败.."), e);
                return;
            }
            log.debug("[{}] 心跳结果[{}]", memberState.getSelfId(), JSONUtil.toJsonStr(backup));
            // 大多数follower没问题
            if (memberState.isQuorum(succNum.get())) {
                lastHeartbeatSendTime = SystemClock.now();
            } else {
                // 有一部分节点正在选举/ 或者 分区的一些低term的 follower
                if (memberState.isQuorum(succNum.get() + notReadyNum.get())) {
                    // 重新发送心跳请求 , 直到quonum(succNum)
                    lastHeartbeatSendTime = -1;
                    // 发生过网络分区 出现了两个leader，并且term相等，理论上不会发生。 如果出现两个leader 那么它们的term肯定不一样
                } else if (inconsistentLeader.get()) {
                    if (log.isDebugEnabled()) {
                        log.info("stateChangeToCandidate4");
                    }
                    stateChangeToCandidate(term);
                    // 任期编号小于其他节点的任期.增加任期 变更为候选者尝试重新选举
                } else if (maxTerm.get() > -1) {
                    if (log.isDebugEnabled()) {
                        log.info("stateChangeToCandidate5");
                    }
                    stateChangeToCandidate(maxTerm.get());
                } else if (TimeUtil.elapsed(lastHeartbeatSendTime) > maxHeartBeatLeak * heartBeatTimeInterval) {
                    if (log.isDebugEnabled()) {
                        log.info("stateChangeToCandidate6");
                    }
                    stateChangeToCandidate(term);
                } else {
                    lastHeartbeatSendTime = -1;
                    log.error("请求id[] 网络通信异常..");
                }
            }
        }
    }

    private List<CompletableFuture<VoteResponse>> voteForQuorumResponses(long term) {
        List<CompletableFuture<VoteResponse>> futures = new ArrayList<>();
        memberState.getPeerMap().forEach((peerId, address) -> {
            // 构建请求对象
            VoteRequest request = buildRequestOrResponse(peerId, VoteRequest::new, voteRequest -> {
                voteRequest.setLeaderEndTerm(memberState.getLedgerEndTerm());
                voteRequest.setLeaderId(memberState.getSelfId());
                voteRequest.setLedgerEndIndex(memberState.getLedgerEndIndex());
                voteRequest.setTerm(term);
            });
            if (peerId.equals(memberState.getSelfId())) {
                futures.add(handleVote(request, true));
            } else {
                futures.add(rpcRemoteService.vote(request));
            }
        });
        return futures;
    }

    public CompletableFuture<VoteResponse> handleVote(VoteRequest request, boolean self) {
        synchronized (memberState) {
            long term = memberState.getCurrTerm();
            long ledgerEndTerm = memberState.getLedgerEndTerm();

            if (!memberState.isPeerMember(request.getLeaderId())) {
                // 未知的节点
                return CompletableFuture.completedFuture(buildVote(request, term, VoteState.REJECT_UNKNOWN_LEADER));
            }
            if (!self && memberState.getSelfId().equals(request.getLeaderId())) {
                // 不应该发送到leader节点上进行投票， 除非发生分区，并且一直未选举成功，突然分区恢复，在一定时间内 发送到leader？
                return CompletableFuture.completedFuture(buildVote(request, term, VoteState.REJECT_UNEXPECTED_LEADER));
            }

            if (memberState.getLedgerEndTerm() > request.getLeaderEndTerm()) {
                // 上次的leader term 过期
                return CompletableFuture.completedFuture(buildVote(request, ledgerEndTerm, VoteState.REJECT_EXPIRED_LEDGER_TERM));
            } else if (memberState.getLedgerEndTerm() == request.getLeaderEndTerm()
                    && memberState.getLedgerEndIndex() > memberState.getLedgerEndIndex()) {
                // 已追加的index小于当前节点的index
                return CompletableFuture.completedFuture(buildVote(request, term, VoteState.REJECT_SMALL_LEDGER_END_INDEX));
            }


            if (memberState.getCurrTerm() > request.getTerm()) {
                return CompletableFuture.completedFuture(buildVote(request, term, VoteState.REJECT_TERM_EXPIRED));
            } else if (memberState.getCurrTerm() == request.getTerm()) {
                if (Objects.isNull(memberState.getCurrVoteFor())) {
                    // skip-
                } else if (memberState.getCurrVoteFor().equals(request.getLeaderId())) {
                    // skip- already voted for the current leaderId 理论上不会出现,一次term 只会请求一次投票才对
                } else {
                    if (Objects.isNull(memberState.getLeaderId())) {
                        // 已经投票给其他节点
                        return CompletableFuture.completedFuture(buildVote(request, term, VoteState.REJECT_ALREADY_VOTED));
                    } else {
                        // 已经做为其他节点的follower
                        return CompletableFuture.completedFuture(buildVote(request, term, VoteState.REJECT_ALREADY_HAS_LEADER));
                    }
                }
            } else {
                // 只有在节点的term相同的情况下才能够进行投票
                stateChangeToCandidate(request.getTerm());
                needIncreaseTermImmediately = true;
                return CompletableFuture.completedFuture(buildVote(request, term, VoteState.REJECT_TERM_NOT_READY));
            }
            if (request.getTerm() < memberState.getLedgerEndTerm()) {
                return CompletableFuture.completedFuture(buildVote(request, ledgerEndTerm, VoteState.REJECT_TERM_SMALL_THAN_LEDGER));
            }
            memberState.setCurrVoteFor(request.getLeaderId());
            return CompletableFuture.completedFuture(buildVote(request, term, VoteState.ACCEPT));
        }
    }

    private VoteResponse buildVote(VoteRequest request, long currTerm, VoteState voteState) {
        return buildResponse(request, VoteResponse::new, voteResponse -> {
            voteResponse.setVoteState(voteState);
            voteResponse.setTerm(currTerm);
        });
    }

    public long getNextTimeToRequestVote() {
        return SystemClock.now() + minVoteIntervalMs + RandomUtil.randomInt(maxVoteIntervalMs - minVoteIntervalMs);
    }

    private <T extends RequestOrResponse> T buildRequestOrResponse(String remoteId, Supplier<T> supplier, Consumer<T> consumer) {
        T requestOrResponse = supplier.get();
        String leaderId = memberState.getLeaderId();
        requestOrResponse.setLeaderId(Objects.isNull(leaderId) ? memberState.getSelfId() : leaderId);
        requestOrResponse.setLocalId(memberState.getSelfId());
        requestOrResponse.setRemoteId(remoteId);
        requestOrResponse.setTerm(memberState.getCurrTerm());
        consumer.accept(requestOrResponse);
        return requestOrResponse;
    }

    private <T extends RequestOrResponse> T buildResponse(RequestOrResponse request, Supplier<T> supplier, Consumer<T> consumer) {
        T requestOrResponse = supplier.get();
        BeanUtil.copyProperties(request, requestOrResponse);
        if (Objects.nonNull(consumer)) {
            consumer.accept(requestOrResponse);
        }
        return requestOrResponse;
    }

    public CompletableFuture<HeartBeatResponse> handleHeartbeat(HeartbeatRequest request) {
        Assert.notNull(request, "心跳请求信息为空");
        Assert.notNull(request.getRemoteId(), "当前不是对应远端的客户端");
        Assert.notNull(request.getLeaderId(), "Leader不能为空");
        ResponseCode responseCode;
        // 心跳状态判断
        try {
            String leaderId = request.getLeaderId();
            Assert.isTrue(memberState.isPeerMember(leaderId), "请求id[]-当前组员外的Leader在发送心跳请求");
            Assert.isTrue(!memberState.getSelfId().equals(leaderId), "请求id[]-Leader发送心跳请求给自己");

            responseCode = doHandleHeartbeat(request, leaderId);

            ResponseCode finalResponseCode = responseCode;
            HeartBeatResponse response = buildResponse(request, HeartBeatResponse::new, heartBeatResponse -> {
                heartBeatResponse.setResponseCode(finalResponseCode);
            });
            log.debug("心跳结果返回：[{}]", JSONUtil.toJsonStr(response));
            return CompletableFuture.completedFuture(response);
        } catch (Exception ex) {
            log.error(CharSequenceUtil.format("请求id[]-处理心跳失败"), ex);
            return CompletableFuture.failedFuture(ex);
        }
    }

    private ResponseCode doHandleHeartbeat(HeartbeatRequest request, String leaderId) {
        ResponseCode responseCode;
        // 进行一些简单判断
        if (request.getTerm() < memberState.getCurrTerm()) {
            return responseCode = ResponseCode.TERM_EXPIRED;
        } else if (request.getTerm() == memberState.getCurrTerm()) {
            if (memberState.getLeaderId() != null && memberState.getLeaderId().equals(leaderId)) {
                lastHeartbeatSendTime = SystemClock.now();
                return responseCode = ResponseCode.SUCCESS;
            } else {
                // see below for execution
            }
        }
        // 深入判断
        synchronized (memberState) {
            if (request.getTerm() < memberState.getCurrTerm()) {
                responseCode = ResponseCode.TERM_EXPIRED;
            } else if (request.getTerm() == memberState.getCurrTerm()) {
                if (memberState.getLeaderId() == null) {
                    // 在已经大多数投票的情况才会发送心跳,当前term相同但是没有leader 直接设置leader
                    lastHeartbeatSendTime = SystemClock.now();
                    stateChangeToFollower(request.getTerm(), leaderId);
                    responseCode = ResponseCode.SUCCESS;
                } else if (memberState.getLeaderId().equals(leaderId)) {
                    // skip .. 已经是当前的leader
                    lastHeartbeatSendTime = SystemClock.now();
                    responseCode = ResponseCode.SUCCESS;
                } else {
                    // Term相同 并且 leaderId 不相同 理论上不会出现该情况，即使分组后 term 也不会相同 相同的term，只有一个leader
                    responseCode = ResponseCode.INCONSISTENT_LEADER;
                    log.warn("请求id[BUG] 相同的term下leader有多个");
                }
            } else {
                // 只有在term相同时，才能回执success给leader， 需要重新选举，增加 term
                if (log.isDebugEnabled()) {
                    log.info("stateChangeToCandidate7");
                }
                stateChangeToCandidate(request.getTerm());
                // 需要立即增加期限
                needIncreaseTermImmediately = true;
                responseCode = ResponseCode.TERM_NOT_READY;
            }
        }
        return responseCode;
    }

    private void stateChangeToCandidate(long term) {
        synchronized (memberState) {
            if (term >= memberState.getCurrTerm()) {
                log.info("[{}]changeToCandidate currTerm:{} term:{}", memberState.getSelfId(), memberState.getCurrTerm(), term);
                memberState.changeToCandidate(term);
                postProcessOfRoleChange(Role.CANDIDATE);
            } else {
                log.error("DEBUG 错误的term");
            }
        }
    }

    private void stateChangeToLeader(long term) {
        synchronized (memberState) {
            Assert.isTrue(term == memberState.getCurrTerm(), "任期编号必须等于当前任期");
            lastHeartbeatSendTime = -1;
            log.info("[{}]stateChangeToLeader currTerm:{} term:{}", memberState.getSelfId(), memberState.getCurrTerm(), term);
            memberState.changeToLeader(term);
            postProcessOfRoleChange(Role.LEADER);
        }
    }


    private void stateChangeToFollower(long term, String leaderId) {
        synchronized (memberState) {
            Assert.isTrue(term >= memberState.getCurrTerm(), "任期编号必须大于等于当前任期");
            Assert.notBlank(leaderId, "leaderId不能为空");
            lastVoteNextType = VoteNextType.WAIT_TO_REVOTE;
            lastHeartbeatSendTime = SystemClock.now();
            log.info("[{}] stateChangeToFollower leadeId[{}] currTerm:{} term:{}",
                    memberState.getSelfId(), leaderId, memberState.getCurrTerm(), term);
            memberState.changeToFollower(term, leaderId);
            postProcessOfRoleChange(Role.FOLLOWER);
        }
    }

    private void postProcessOfRoleChange(Role role) {
        for (RoleChangeHandler roleChangeHandler : roleChangeHandlers) {
            roleChangeHandler.changeRole(role);
        }
    }

}
